富函数(Rich Functions) 您所在的位置:网站首页 flink richfunction中open方法调用 富函数(Rich Functions)

富函数(Rich Functions)

2023-11-26 08:59| 来源: 网络整理| 查看: 265

“富函数”是DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。  RichMapFunction  RichFlatMapFunction  RichFilterFunction Rich Function有一个生命周期的概念。典型的生命周期方法有:  open()方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用。  close()方法是生命周期中的最后一个调用的方法,做一些清理工作。  getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态

package com.atguigu.apiTest import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.configuration.Configuration object RichFunction { def main(args: Array[String]): Unit = { } } class MyMapFunction extends RichMapFunction[Double, Int]{ //ctr + O 查看重写方法 override def map(value: Double): Int = { value.toInt - 1 } override def open(parameters: Configuration): Unit = super.open(parameters) override def close(): Unit = super.close() } class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] { //子任务的index var subTaskIndex = 0 override def open(configuration: Configuration): Unit = { subTaskIndex = getRuntimeContext.getIndexOfThisSubtask // 以下可以做一些初始化工作,例如建立一个和HDFS的连接 } override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = { if (in % 2 == subTaskIndex) { out.collect((subTaskIndex, in)) } } override def close(): Unit = { // 以下做一些清理工作,例如断开和HDFS的连接。 } }


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

      专题文章
        CopyRight 2018-2019 实验室设备网 版权所有